import pandas as pd
import plotly.express as px
import plotly.io as pio
#pio.renderers.default = "svg"
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id
import os
# Set random seed
np.random.seed(42)
# Change Plotly renderer for notebooks
pio.renderers.default = "notebook_connected"Assignment 03
1 Import Packages
2 Plotly Templete
pio.templates["nike"] = go.layout.Template(
# LAYOUT
layout = {
# Fonts
# Note - 'family' must be a single string, NOT a list or dict!
'title':
{'font': {'family': 'HelveticaNeue-CondensedBold, Helvetica, Sans-serif',
'size':30,
'color': '#333'}
},
'font': {'family': 'Helvetica Neue, Helvetica, Sans-serif',
'size':16,
'color': '#333'},
# Colorways
'colorway': ['#ec7424', '#a4abab'],
# Keep adding others as needed below
'hovermode': 'x unified'
},
# DATA
data = {
# Each graph object must be in a tuple or list for each trace
'bar': [go.Bar(texttemplate = '%{value:$.2s}',
textposition='outside',
textfont={'family': 'Helvetica Neue, Helvetica, Sans-serif',
'size': 20,
'color': '#FFFFFF'
})]
}
)
# Make Nike the default for BOTH PX and GO:
px.defaults.template = "nike"
pio.templates.default = "nike"3 Load Dataset
# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()
# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("/home/ubuntu/assignment-03-Sabrina1211/data/lightcast_job_postings.csv")
df.createOrReplaceTempView("job_postings")
# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")
#df.printSchema() # comment this line when rendering the submission
#df.show(5)Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 20:50:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:> (0 + 1) / 1] 25/09/23 20:51:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
4 Data Preparation
# Step 1: Casting Salary and experience columns
df = df.withColumn("SALARY", col("SALARY").cast("float")) \
.withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
.withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
.withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))
# Step 2: Computing medians for salary columns
def compute_median(sdf, col_name):
q = sdf.approxQuantile(col_name, [0.5], 0.01)
return q[0] if q else None
median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")
print("Medians:", median_from, median_to, median_salary)
# Step 4: Imputing missing salaries, but no experience
df = df.fillna({
"SALARY_FROM": median_from,
"SALARY_TO": median_to,
})
# Step 5: Computing Average Salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) /2)
# Step 6: Selecting required columns
export_cols = [
"EDUCATION_LEVELS_NAME",
"REMOTE_TYPE_NAME",
"MAX_YEARS_EXPERIENCE",
"Average_Salary",
"SALARY",
"LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]
df_selected = df.select(*export_cols)
# Step 7: Saving to csv
pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)
print("Data cleaning complete. Rows retained:", len(pdf))[Stage 2:> (0 + 1) / 1] [Stage 3:> (0 + 1) / 1] [Stage 4:> (0 + 1) / 1]
Medians: 87295.0 130042.0 115024.0
[Stage 5:> (0 + 1) / 1]
Data cleaning complete. Rows retained: 72498
5 Salary Distribution Employment Type
# Salary Distribution Employment Type
os.makedirs("figures", exist_ok=True)
pdf = (
df.select("EMPLOYMENT_TYPE_NAME", F.col("SALARY").cast("double").alias("SALARY"))
.filter((F.col("SALARY").isNotNull()) & (F.col("SALARY") > 0))
.toPandas()
)
pdf["EMPLOYMENT_TYPE_NAME"] = (
pdf["EMPLOYMENT_TYPE_NAME"].astype("string").fillna("Unknown")
.str.replace(r"[^\x00-\x7F]+", "", regex=True).str.strip()
)
sorted_employment_types = (
pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
.sort_values(ascending=False).index
)
pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(
pdf["EMPLOYMENT_TYPE_NAME"],
categories=sorted_employment_types,
ordered=True
)
fig = px.box(
pdf,
x="EMPLOYMENT_TYPE_NAME",
y="SALARY",
title="Salary Distribution by Employment Type",
points="outliers" # template will set color/style
)
fig.update_layout(
xaxis=dict(title="Employment Type",
categoryorder="array",
categoryarray=sorted_employment_types.tolist(),
tickfont=dict(size=18)),
yaxis=dict(title="Salary (K $)", range=[0, 500000],
tickvals=[0, 50_000, 100_000, 150_000, 200_000, 250_000, 300_000, 350_000, 400_000, 450_000, 500_000],
ticktext=["0","50K","100K","150K","200K","250K","300K","350K","400K","450K","500K"]),
font=dict(family="Arial", size=16),
showlegend=False,
height=500, width=850
)
fig.write_html("figures/DistributionEmploymentType.html")
fig.write_image("figures/DistributionEmploymentType.svg", width=850, height=500, scale=1)[Stage 6:> (0 + 1) / 1]
6 Salary Distribution by Industry
pdf = df.select("NAICS2_NAME", "SALARY").toPandas()
fig = px.box(
pdf,
x="NAICS2_NAME",
y="SALARY",
title="Salary Distribution by Industry",
color_discrete_sequence=["#EF553B"]
)
fig.update_layout(template="nike") # change to "plotly_white" if this template isn't available
# rotate x-axis labels for readability
fig.update_xaxes(tickangle=45)
# fig.show()
fig.write_html("figures/DistributionIndustry.html")
fig.write_image("figures/DistributionIndustry.svg", width=1000, height=600, scale=2)[Stage 7:> (0 + 1) / 1][Stage 7:===========================================================(1 + 0) / 1]
7 Salary Analysis by ONET Occupation Type (Bubble Chart)
# Step 1: Spark SQL - Median salary and job count per TITLE_NAME
salary_analysis = spark.sql("""
SELECT
LOT_OCCUPATION_NAME AS Occupation_Name,
PERCENTILE(SALARY, 0.5) AS Median_Salary,
COUNT(*) AS Job_Postings
FROM job_postings
GROUP BY LOT_OCCUPATION_NAME
ORDER BY Job_Postings DESC
LIMIT 10
""")
# Step 2: Convert to Pandas DataFrame
salary_pd = salary_analysis.toPandas()
salary_pd.head()[Stage 8:> (0 + 1) / 1]
| Occupation_Name | Median_Salary | Job_Postings | |
|---|---|---|---|
| 0 | Data / Data Mining Analyst | 95250.0 | 30057 |
| 1 | Business Intelligence Analyst | 125900.0 | 29445 |
| 2 | Computer Systems Engineer / Architect | 157600.0 | 8212 |
| 3 | Business / Management Analyst | 93650.0 | 4326 |
| 4 | Clinical Analyst / Clinical Documentation and ... | 89440.0 | 261 |
fig = px.scatter(
salary_pd,
x="Occupation_Name",
y="Median_Salary",
size="Job_Postings",
title="Salary Analysis by LOT Occupation Type (Bubble Chart)",
labels={
"Occupation_Name": "LOT Occupation",
"Median_Salary": "Median Salary",
"Job_Postings": "Number of Job Postings",
},
hover_name="Occupation_Name",
size_max=60,
width=1000,
height=600,
color="Job_Postings",
color_continuous_scale="Plasma",
)
# Step 4: Layout customization
fig.update_layout(
font_family="Arial",
font_size=14,
title_font_size=25,
xaxis_title="LOT Occupation",
yaxis_title="Median Salary",
plot_bgcolor="white",
xaxis=dict(
tickangle=-45,
showline=True,
linecolor="black",
),
yaxis=dict(
showline=True,
linecolor="black",
),
)
# Step 5: Show and export
fig.show()
fig.write_html("figures/salaryAnalsisbyLotOccupation.html")
fig.write_image("figures/salaryAnalysisbyLotOccupation.svg", width=1000, height=600, scale=1)8 Salary by Education Level
# Map raw education text into 4 groups
df_edu = (
df.withColumn(
"EDU_GROUP",
F.when(F.col("EDUCATION_LEVELS_NAME").rlike("(?i)Associate|GED|No Education Listed|High school"), "Associate or Lower")
.when(F.col("EDUCATION_LEVELS_NAME").rlike("(?i)Bachelor"), "Bachelor")
.when(F.col("EDUCATION_LEVELS_NAME").rlike("(?i)Master"), "Master’s")
.when(F.col("EDUCATION_LEVELS_NAME").rlike("(?i)PhD|Doctorate|professional degree"), "PhD")
.otherwise(None)
)
.filter(
F.col("EDU_GROUP").isNotNull()
& F.col("MAX_YEARS_EXPERIENCE").isNotNull()
& F.col("Average_Salary").isNotNull()
& (F.col("MAX_YEARS_EXPERIENCE") > 0)
& (F.col("Average_Salary") > 0)
)
)
pdf_edu = df_edu.select(
F.col("MAX_YEARS_EXPERIENCE").alias("Experience"),
F.col("Average_Salary").alias("Average_Salary"),
F.col("EDU_GROUP").alias("Education Group"),
F.col("LOT_V6_SPECIALIZED_OCCUPATION_NAME").alias("Occupation Name")
).toPandas()
# --- 2) Show table first ------------------------------------------------------
from IPython.display import display
tbl = (
pdf_edu[["Experience","Average_Salary","Education Group","Occupation Name"]]
.rename(columns={"Experience":"MAX_EXPERIENCE","Average_Salary":"AVERAGE_SALARY",
"Education Group":"EDU_GROUP","Occupation Name":"OCCUPATION_NAME"})
.head(5)
)
display(
tbl.style
.format({"AVERAGE_SALARY": "${:,.0f}", "MAX_EXPERIENCE": "{:.1f}"})
.hide(axis="index")
)
# Plot (nike template)
order = ["Associate or Lower","Bachelor","Master’s","PhD"]
fig1 = px.scatter(
pdf_edu,
x="Experience",
y="Average_Salary",
color="Education Group",
category_orders={"Education Group": order},
title="Experience vs Salary by Education Level",
labels={"Experience":"Years of Experience", "Average_Salary":"Average Salary (USD)"},
template="nike",
opacity=0.8
)
# Soft beige frame + light plotting area + subtle grid
fig1.update_layout(
paper_bgcolor="#FFF5DC", # outer background (around the plot)
plot_bgcolor="#f9f9f9", # inner plotting area
margin=dict(l=60, r=60, t=80, b=70),
xaxis=dict(
tickmode="linear", dtick=1,
gridcolor="rgba(0,0,0,0.12)", # light grid
zeroline=False
),
yaxis=dict(
gridcolor="rgba(0,0,0,0.12)",
zeroline=False
),
legend=dict(
bgcolor="rgba(255,245,220,0.7)", # optional: match the frame
bordercolor="#E6D9B6", borderwidth=1
)
)
fig1.update_traces(marker=dict(size=6, opacity=0.8))
# Show the chart
#fig1.show()
fig1.write_html("figures/SalaryEducationLevel.html")
fig1.write_image("figures/SalaryEducationLevel.svg", width=1000, height=600, scale=1)[Stage 11:> (0 + 1) / 1]
| MAX_EXPERIENCE | AVERAGE_SALARY | EDU_GROUP | OCCUPATION_NAME |
|---|---|---|---|
| 2.0 | $108,668 | Bachelor | General ERP Analyst / Consultant |
| 3.0 | $108,668 | Associate or Lower | Oracle Consultant / Analyst |
| 7.0 | $108,668 | Associate or Lower | General ERP Analyst / Consultant |
| 2.0 | $92,962 | Bachelor | Data Analyst |
| 5.0 | $108,668 | Associate or Lower | Data Analyst |